home *** CD-ROM | disk | FTP | other *** search
/ Amiga Tools 4 / Amiga Tools 4.iso / tools / dfue / term 4.6(?) / extras / source / term-source.lha / MsgQueue.c < prev    next >
C/C++ Source or Header  |  1996-03-18  |  7KB  |  400 lines

  1. /*
  2. **    MsgQueue.c
  3. **
  4. **    Special MsgPort like queue management
  5. **
  6. **    Copyright © 1990-1996 by Olaf `Olsen' Barthel
  7. **        All Rights Reserved
  8. */
  9.  
  10. #ifndef _GLOBAL_H
  11. #include "Global.h"
  12. #endif
  13.  
  14.     /* DestroyPooled(struct MsgItem *Item):
  15.      *
  16.      *    Default destructor for CreateMsgItem().
  17.      */
  18.  
  19. STATIC VOID __stdargs
  20. DestroyPooled(struct MsgItem *Item)
  21. {
  22.     FreeVecPooled(Item);
  23. }
  24.  
  25.     /* GetMsgItem(struct MsgQueue *Queue):
  26.      *
  27.      *    Get the next message from the queue handle,
  28.      *    similar to GetMsg().
  29.      */
  30.  
  31. APTR
  32. GetMsgItem(struct MsgQueue *Queue)
  33. {
  34.         // Valid handle?
  35.  
  36.     if(Queue)
  37.     {
  38.         APTR Item;
  39.  
  40.             // Gain access
  41.  
  42.         ObtainSemaphore(&Queue -> Access);
  43.  
  44.             // Any item available?
  45.  
  46.         if(Queue -> MsgList . mlh_Head -> mln_Succ)
  47.         {
  48.                 // Remove the first item
  49.  
  50.             Remove((struct Node *)(Item = Queue -> MsgList . mlh_Head));
  51.  
  52.                 // Are there any tasks waiting for the queue to become smaller?
  53.  
  54.             if(Queue -> WaitList . mlh_Head -> mln_Succ)
  55.             {
  56.                 struct SemaphoreRequest *WaitRequest = (struct SemaphoreRequest *)Queue -> WaitList . mlh_Head;
  57.  
  58.                     // Remove one waiting task from the list
  59.  
  60.                 Remove((struct Node *)WaitRequest);
  61.  
  62.                     // Wake the task up
  63.  
  64.                 Signal(WaitRequest -> sr_Waiter,SIGF_SINGLE);
  65.             }
  66.  
  67.                 // One message taken
  68.  
  69.             if(Queue -> QueueSize)
  70.                 Queue -> QueueSize--;
  71.         }
  72.         else
  73.             Item = NULL;
  74.  
  75.             // Drop the semaphore
  76.  
  77.         ReleaseSemaphore(&Queue -> Access);
  78.  
  79.             // Return the item read
  80.  
  81.         return(Item);
  82.     }
  83.     else
  84.         return(NULL);
  85. }
  86.  
  87.     /* PutMsgItem(struct MsgQueue *Queue,struct MsgItem *Item):
  88.      *
  89.      *    Add a message item to a queue handle, similar to
  90.      *    PutMsg().
  91.      */
  92.  
  93. VOID
  94. PutMsgItem(struct MsgQueue *Queue,struct MsgItem *Item)
  95. {
  96.         // Valid data?
  97.  
  98.     if(Queue && Item)
  99.     {
  100.             // Gain access to the handle
  101.  
  102.         ObtainSemaphore(&Queue -> Access);
  103.  
  104.             // Are we to discard this message?
  105.  
  106.         if(Queue -> Discard)
  107.         {
  108.             ReleaseSemaphore(&Queue -> Access);
  109.  
  110.             DeleteMsgItem(Item);
  111.         }
  112.         else
  113.         {
  114.                 // Are we to watch for a maximum queue size?
  115.  
  116.             if(Queue -> MaxSize)
  117.             {
  118.                     // Maximum queue size already reached?
  119.  
  120.                 if(Queue -> MaxSize == Queue -> QueueSize)
  121.                 {
  122.                     struct SemaphoreRequest WaitRequest;
  123.  
  124.                         // That's me
  125.  
  126.                     WaitRequest . sr_Waiter = FindTask(NULL);
  127.  
  128.                         // Add this task to the waiting list
  129.  
  130.                     AddTail((struct List *)&Queue -> WaitList,(struct Node *)&WaitRequest);
  131.  
  132.                         // Careful, please
  133.  
  134.                     Forbid();
  135.  
  136.                         // Drop the semaphore
  137.  
  138.                     ReleaseSemaphore(&Queue -> Access);
  139.  
  140.                         // Clear the one-shot flag
  141.  
  142.                     ClrSignal(SIGF_SINGLE);
  143.  
  144.                         // Wait for the queue to become smaller
  145.  
  146.                     Wait(SIGF_SINGLE);
  147.  
  148.                         // Gain access to the handle
  149.  
  150.                     ObtainSemaphore(&Queue -> Access);
  151.  
  152.                         // Reenable multitasking
  153.  
  154.                     Permit();
  155.                 }
  156.  
  157.                     // We are going to make the queue longer
  158.  
  159.                 Queue -> QueueSize++;
  160.             }
  161.  
  162.                 // Add the item to the list
  163.  
  164.             AddTail((struct List *)&Queue -> MsgList,(struct Node *)Item);
  165.  
  166.                 // Wake up the owner
  167.  
  168.             Signal(Queue -> SigTask,Queue -> SigMask);
  169.  
  170.                 // Drop the semaphore
  171.  
  172.             ReleaseSemaphore(&Queue -> Access);
  173.         }
  174.     }
  175. }
  176.  
  177.     /* DeleteMsgItem(struct MsgItem *Item):
  178.      *
  179.      *    Clean up after a message item.
  180.      */
  181.  
  182. VOID
  183. DeleteMsgItem(struct MsgItem *Item)
  184. {
  185.         // Valid data?
  186.  
  187.     if(Item)
  188.     {
  189.             // Do we have a destructor for this item?
  190.  
  191.         if(Item -> Destructor)
  192.             (*Item -> Destructor)(Item);
  193.     }
  194. }
  195.  
  196.     /* CreateMsgItem(LONG Size):
  197.      *
  198.      *    Allocate a new message item.
  199.      */
  200.  
  201. struct MsgItem *
  202. CreateMsgItem(LONG Size)
  203. {
  204.     struct MsgItem    *Item;
  205.  
  206.         // Allocate space for the item and add the default destructor
  207.  
  208.     if(Item = (struct MsgItem *)AllocVecPooled(Size,MEMF_ANY))
  209.         Item -> Destructor = DestroyPooled;
  210.  
  211.     return(Item);
  212. }
  213.  
  214.     /* CreateMinMsgItem(LONG Size):
  215.      *
  216.      *    Allocate a new message item, make sure that the header
  217.      *    is present.
  218.      */
  219.  
  220. struct MsgItem *
  221. CreateMinMsgItem(LONG Size)
  222. {
  223.     struct MsgItem    *Item;
  224.  
  225.         // Allocate space for the item and add the default destructor
  226.  
  227.     if(Item = (struct MsgItem *)AllocVecPooled(sizeof(struct MsgItem) + Size,MEMF_ANY))
  228.         Item -> Destructor = DestroyPooled;
  229.  
  230.     return(Item);
  231. }
  232.  
  233.     /* UnlockMsgQueue(struct MsgQueue *Queue):
  234.      *
  235.      *    Wake up all tasks waiting for the queue to become
  236.      *    smaller. Note: this will change the maximum queue
  237.      *    size.
  238.      */
  239.  
  240. VOID
  241. UnlockMsgQueue(struct MsgQueue *Queue)
  242. {
  243.         // Valid data?
  244.  
  245.     if(Queue)
  246.     {
  247.         struct SemaphoreRequest *WaitRequest;
  248.  
  249.             // Gain access to the handle
  250.  
  251.         ObtainSemaphore(&Queue -> Access);
  252.  
  253.             // Notify all waiting tasks
  254.  
  255.         for(WaitRequest = (struct SemaphoreRequest *)Queue -> WaitList . mlh_Head ; WaitRequest -> sr_Link . mln_Succ ; WaitRequest = (struct SemaphoreRequest *)WaitRequest -> sr_Link . mln_Succ)
  256.             Signal(WaitRequest -> sr_Waiter,SIGF_SINGLE);
  257.  
  258.             // Reset the maximum queue size
  259.  
  260.         Queue -> MaxSize = 0;
  261.  
  262.             // Drop the semaphore
  263.  
  264.         ReleaseSemaphore(&Queue -> Access);
  265.     }
  266. }
  267.  
  268.     /* DeleteMsgQueue(struct MsgQueue *Queue):
  269.      *
  270.      *    Free a queue handle.
  271.      */
  272.  
  273. VOID
  274. DeleteMsgQueue(struct MsgQueue *Queue)
  275. {
  276.         // Valid data?
  277.  
  278.     if(Queue)
  279.     {
  280.         struct MsgItem *Item,*Next;
  281.  
  282.             // Make sure no tasks is waiting in the list
  283.  
  284.         UnlockMsgQueue(Queue);
  285.  
  286.             // Disable multitasking
  287.  
  288.         Forbid();
  289.  
  290.             // Clear the queue signal
  291.  
  292.         ClrSignal(Queue -> SigMask);
  293.  
  294.             // Wait until all tasks are finished
  295.  
  296.         while(Queue -> QueueSize)
  297.             Wait(Queue -> SigMask);
  298.  
  299.             // Reenable multitasking
  300.  
  301.         Permit();
  302.  
  303.             // Remove each item from the list
  304.  
  305.         for(Item = (struct MsgItem *)Queue -> MsgList . mlh_Head ; Next = (struct MsgItem *)Item -> Link . mln_Succ ; Item = Next)
  306.             DeleteMsgItem(Item);
  307.  
  308.             // Free the signal bit if necessary
  309.  
  310.         if(Queue -> SigBit != -1)
  311.             FreeSignal(Queue -> SigBit);
  312.  
  313.             // Free the handle
  314.  
  315.         FreeVecPooled(Queue);
  316.     }
  317. }
  318.  
  319.     /* CreateMsgQueue(ULONG SigMask,LONG MaxSize):
  320.      *
  321.      *    Allocate a queue handle.
  322.      */
  323.  
  324. struct MsgQueue *
  325. CreateMsgQueue(ULONG SigMask,LONG MaxSize)
  326. {
  327.     struct MsgQueue    *Queue;
  328.  
  329.         // Allocate the queue handle
  330.  
  331.     if(Queue = (struct MsgQueue *)AllocVecPooled(sizeof(struct MsgQueue),MEMF_ANY | MEMF_PUBLIC | MEMF_CLEAR))
  332.     {
  333.             // Initialize the access semaphore
  334.  
  335.         InitSemaphore(&Queue -> Access);
  336.  
  337.             // Initialize the message item list
  338.  
  339.         NewList((struct List *)&Queue -> MsgList);
  340.  
  341.             // Reset the size data
  342.  
  343.         Queue -> QueueSize    = 0;
  344.         Queue -> MaxSize    = MaxSize;
  345.  
  346.             // Initialize the queue wait list
  347.  
  348.         NewList((struct List *)&Queue -> WaitList);
  349.  
  350.             // Store the owner address
  351.  
  352.         Queue -> SigTask = FindTask(NULL);
  353.  
  354.             // Do we have a signal mask ready?
  355.  
  356.         if(SigMask)
  357.         {
  358.                 // Use it
  359.  
  360.             Queue -> SigMask    = SigMask;
  361.             Queue -> SigBit        = -1;
  362.         }
  363.         else
  364.         {
  365.                 // Allocate a new signal
  366.  
  367.             if((Queue -> SigBit = AllocSignal(-1)) == -1)
  368.             {
  369.                 FreeVecPooled(Queue);
  370.  
  371.                 return(NULL);
  372.             }
  373.             else
  374.                 Queue -> SigMask = (1L << Queue -> SigBit);
  375.         }
  376.     }
  377.  
  378.     return(Queue);
  379. }
  380.  
  381.     /* SetQueueDiscard(struct MsgQueue *Queue,BOOLEAN Mode):
  382.      *
  383.      *    Set whether new items should be added to the
  384.      *    queue or whether they should rather get
  385.      *    discarded instead.
  386.      */
  387.  
  388. VOID
  389. SetQueueDiscard(struct MsgQueue *Queue,BOOL Mode)
  390. {
  391.     if(Queue)
  392.     {
  393.         ObtainSemaphore(&Queue -> Access);
  394.  
  395.         Queue -> Discard = Mode;
  396.  
  397.         ReleaseSemaphore(&Queue -> Access);
  398.     }
  399. }
  400.